0ae2a3851cbfe1ec2f2c7237954b18c9951c76a3,runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java,FlinkMergingNonShuffleReduceFunction,reduce,#Iterable#Collector#,93

Before Change


      Iterable<WindowedValue<KV<K, InputT>>> elements,
      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {

    FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
        new FlinkSingleOutputProcessContext<>(
            serializedOptions.getPipelineOptions(),
            getRuntimeContext(),
            doFn,
            windowingStrategy,
            sideInputs, out
        );

    OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
        OldPerKeyCombineFnRunners.create(combineFn);

    @SuppressWarnings("unchecked")
    OutputTimeFn<? super BoundedWindow> outputTimeFn =
        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();

    // get all elements so that we can sort them, has to fit into
    // memory
    // this seems very unprudent, but correct, for now
    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
    for (WindowedValue<KV<K, InputT>> inputValue: elements) {
      for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) {
        sortedInput.add(exploded);
      }
    }
    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
      @Override
      public int compare(
          WindowedValue<KV<K, InputT>> o1,
          WindowedValue<KV<K, InputT>> o2) {
        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
      }
    });

    // merge windows, we have to do it in an extra pre-processing step and
    // can't do it as we go since the window of early elements would not
    // be correct when calling the CombineFn
    mergeWindow(sortedInput);

    // iterate over the elements that are sorted by window timestamp
    final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();

    // create accumulator using the first elements key
    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
    K key = currentValue.getValue().getKey();
    IntervalWindow currentWindow =
        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
    InputT firstValue = currentValue.getValue().getValue();
    processContext.setWindowedValue(currentValue);
    AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
    accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);

    // we use this to keep track of the timestamps assigned by the OutputTimeFn
    Instant windowTimestamp =
        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);

    while (iterator.hasNext()) {
      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
      IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());

      if (currentWindow.equals(nextWindow)) {
        // continue accumulating and merge windows

        InputT value = nextValue.getValue().getValue();
        processContext.setWindowedValue(nextValue);
        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);

        windowTimestamp = outputTimeFn.combine(
            windowTimestamp,
            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));

      } else {
        // emit the value that we currently have
        out.collect(
            WindowedValue.of(
                KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
                windowTimestamp,
                currentWindow,
                PaneInfo.NO_FIRING));

        currentWindow = nextWindow;
        InputT value = nextValue.getValue().getValue();
        processContext.setWindowedValue(nextValue);
        accumulator = combineFnRunner.createAccumulator(key, processContext);
        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);

After Change


      Iterable<WindowedValue<KV<K, InputT>>> elements,
      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {

    PipelineOptions options = serializedOptions.getPipelineOptions();

    FlinkSideInputReader sideInputReader =
        new FlinkSideInputReader(sideInputs, getRuntimeContext());

    PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
        PerKeyCombineFnRunners.create(combineFn);

    @SuppressWarnings("unchecked")
    OutputTimeFn<? super BoundedWindow> outputTimeFn =
        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();

    // get all elements so that we can sort them, has to fit into
    // memory
    // this seems very unprudent, but correct, for now
    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
      for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
        sortedInput.add(exploded);
      }
    }
    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
      @Override
      public int compare(
          WindowedValue<KV<K, InputT>> o1,
          WindowedValue<KV<K, InputT>> o2) {
        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
      }
    });

    // merge windows, we have to do it in an extra pre-processing step and
    // can't do it as we go since the window of early elements would not
    // be correct when calling the CombineFn
    mergeWindow(sortedInput);

    // iterate over the elements that are sorted by window timestamp
    final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();

    // create accumulator using the first elements key
    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
    K key = currentValue.getValue().getKey();
    IntervalWindow currentWindow =
        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
    InputT firstValue = currentValue.getValue().getValue();
    AccumT accumulator =
        combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows());
    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
        options, sideInputReader, currentValue.getWindows());

    // we use this to keep track of the timestamps assigned by the OutputTimeFn
    Instant windowTimestamp =
        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);

    while (iterator.hasNext()) {
      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
      IntervalWindow nextWindow =
          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());

      if (currentWindow.equals(nextWindow)) {
        // continue accumulating and merge windows

        InputT value = nextValue.getValue().getValue();
        accumulator = combineFnRunner.addInput(key, accumulator, value,
            options, sideInputReader, currentValue.getWindows());

        windowTimestamp = outputTimeFn.combine(
            windowTimestamp,
            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));

      } else {
        // emit the value that we currently have
        out.collect(
            WindowedValue.of(
                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
                    options, sideInputReader, currentValue.getWindows())),
                windowTimestamp,
                currentWindow,
                PaneInfo.NO_FIRING));

        currentWindow = nextWindow;
        currentValue = nextValue;
        InputT value = nextValue.getValue().getValue();
        accumulator = combineFnRunner.createAccumulator(key,
            options, sideInputReader, currentValue.getWindows());
        accumulator = combineFnRunner.addInput(key, accumulator, value,
            options, sideInputReader, currentValue.getWindows());
        windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
      }